package com.comtom.soft.thrift.client;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.StringUtils;

/**
 * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
 */
public class ThriftServerProviderZookeeper implements ThriftServerProvider, InitializingBean {

	private Logger logger = LoggerFactory.getLogger(getClass());

	// 注册服务
	private String service;
	// 服务版本号
	private String version;

	private PathChildrenCache cachedPath;

	private CuratorFramework zkClient;

	// 用来保存当前provider所接触过的地址记录
	// 当zookeeper集群故障时,可以使用trace中地址,作为"备份"
	private Set<String> trace = new HashSet<String>();

	private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();

	private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();

	private Object lock = new Object();

	public void setService(String service) {
		this.service = service;
	}

	public void setVersion(String version) {
		this.version = version;
	}

	public ThriftServerProviderZookeeper() {
	}

	public ThriftServerProviderZookeeper(CuratorFramework zkClient) {
		this.zkClient = zkClient;
	}

	public void setZkClient(CuratorFramework zkClient) {
		this.zkClient = zkClient;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		// 如果zk尚未启动,则启动
		if (zkClient.getState() == CuratorFrameworkState.LATENT) {
			zkClient.start();
		}
		buildPathChildrenCache(zkClient, getServicePath(), true);
		cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
	}

	private String getServicePath(){
		if(!StringUtils.isEmpty(version)){
			return "/" + service + "/" + version;			
		}else{
			return "/" + service ;
		}
	}
	private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
		cachedPath = new PathChildrenCache(client, path, cacheData);
		cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
			@Override
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				PathChildrenCacheEvent.Type eventType = event.getType();
				ChildData data=event.getData();
				switch (eventType) {
				case CHILD_ADDED:
					addData(data);
					return;
				case CHILD_UPDATED:
					return;
				case CHILD_REMOVED:
					removeData(data);
					return;
				default:
					rebuild();
				}
			}

			protected void rebuild() throws Exception {
				container.clear();
				inner.clear();
				cachedPath.rebuild();
				List<ChildData> children = cachedPath.getCurrentData();
				if (children == null || children.isEmpty()) {
					logger.warn("thrift server-cluster error....");
					return;
				}
				for (ChildData data : children) {
					addData(data);
					String path = data.getPath();
					path = path.substring(getServicePath().length()+1);
					trace.add(path);
				}
			}
		});
	}

	
	private void addData(ChildData data){
		InetSocketAddress address=transfer(data);
		if(address==null){
			return ;
		}
		synchronized (lock) {
			if(!container.contains(address)){
				container.add(address);
			}
			if(!inner.contains(address)){
				inner.add(address);
			}			
		}

	}
	
	private void removeData(ChildData data){
		InetSocketAddress address=transfer(data);
		if(address==null){
			return ;
		}
		synchronized (lock) {
			container.remove(address);
			inner.remove(address);			
		}
	}
	
	private InetSocketAddress transfer(ChildData data) {
		if(data==null){
			return null;
		}
		String path = data.getPath();
		path = path.substring(getServicePath().length()+1);
		return transfer(path);
	}
		
	private InetSocketAddress transfer(String address) {
		if(StringUtils.isEmpty(address)){
			return null;
		}
		String[] hostname = address.split(":");
		if(hostname.length<2){
			return null;
		}
		String ip = hostname[0];
		Integer port=null;
		try {
			port = Integer.valueOf(hostname[1]);
		} catch (Exception e) {
			logger.warn("transfer address error.address="+address);
			return null;
		}
		if(hostname.length==3){
			String serviceName=hostname[2];		
			return new ServiceSocketAddress(ip, port,serviceName);
		}else{
			return new InetSocketAddress(ip, port);
		}
	}

	@Override
	public List<InetSocketAddress> findServerAddressList() {
		return Collections.unmodifiableList(container);
	}

	@Override
	public synchronized InetSocketAddress selector() {
		if (inner.isEmpty()) {
			if (!container.isEmpty()) {
				inner.addAll(container);
			} else if (!trace.isEmpty()) {
				synchronized (lock) {
					for (String hostname : trace) {
						InetSocketAddress address=transfer(hostname);
						if(address!=null){
							container.add(address);			
						}
					}
					Collections.shuffle(container);
					inner.addAll(container);
				}
			}
		}
		return inner.poll();
	}

	@Override
	public void close() {
		try {
            cachedPath.close();
            zkClient.close();
            trace.clear();
            container.clear();
            inner.clear();
        } catch (Exception e) {
        }
	}

	@Override
	public String getService() {
		return service;
	}

}
